package cn.zhangfusheng.elasticsearch.transfer;

import cn.zhangfusheng.elasticsearch.annotation.ElasticSearchConfig;
import cn.zhangfusheng.elasticsearch.annotation.document.IndexTransfer;
import cn.zhangfusheng.elasticsearch.constant.ElasticSearchConstant;
import cn.zhangfusheng.elasticsearch.constant.enumeration.TransferType;
import cn.zhangfusheng.elasticsearch.cycle.CreateMappingBefore;
import cn.zhangfusheng.elasticsearch.cycle.CreateMappingEnd;
import cn.zhangfusheng.elasticsearch.exception.GlobalSystemException;
import cn.zhangfusheng.elasticsearch.exception.InitRepositoryException;
import cn.zhangfusheng.elasticsearch.model.annotation.DefaultElasticSearchConfig;
import cn.zhangfusheng.elasticsearch.model.page.PageRequest;
import cn.zhangfusheng.elasticsearch.scan.ElasticSearchEntityRepositoryDetail;
import cn.zhangfusheng.elasticsearch.template.ElasticSearchRestTemplate;
import cn.zhangfusheng.elasticsearch.thread.ThreadLocalDetail;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
 * @author fusheng.zhang
 * @date 2022-02-25 10:56:16
 */
@Slf4j
public class TransferOperation {

    private final ElasticSearchEntityRepositoryDetail entityRepositoryDetail;
    private final ElasticSearchRestTemplate elasticSearchRestTemplate;
    private final ElasticSearchConfig elasticSearchConfig;

    public TransferOperation(
            ElasticSearchEntityRepositoryDetail entityRepositoryDetail,
            ElasticSearchRestTemplate elasticSearchRestTemplate) {
        this.entityRepositoryDetail = entityRepositoryDetail;
        this.elasticSearchRestTemplate = elasticSearchRestTemplate;
        this.elasticSearchConfig = DefaultElasticSearchConfig.INSTANCE;
    }

    /**
     * 编辑文档 以及迁移数据
     * @return
     */
    public void operationDocumentAndTransferDb() {
        // 版本管理
        TransferInfo transferInfo = upgradeIndex();
        // 迁移数据
        if (BooleanUtils.isTrue(transferInfo.getTransfer())) {
            transferDb(transferInfo.getUpIndexName(), transferInfo.getIndexName());
        }
        // 变更别名
        if (BooleanUtils.isTrue(transferInfo.getUpdateAlias())) this.operationAlias();
        // 生成操作记录
        if (StringUtils.isBlank(transferInfo.getRowId())) {
            String rowId = UUIDs.randomBase64UUID();
            elasticSearchRestTemplate.index(
                    rowId, null, ElasticSearchConstant.TRANSFER_INDEX, transferInfo.xContentBuilder(rowId));
        }
    }

    /**
     * 升级索引,并获取数据迁移相关信息
     * 根据className 查询对应的版本控制数据
     * 如果无版本控制数据,则创建index和mapping
     * 如果索引版本号不一致,则升级索引和mapping,并迁移数据
     * 如果索引版本号一致,则更新mapping
     * @return
     */
    private TransferInfo upgradeIndex() {
        String entityClassName = entityRepositoryDetail.getEntityClass().getName();
        TransferInfo transferInfo = new TransferInfo().setClassName(entityClassName);
        // 查询关于该类的索引升级的最后十条数据
        List<SearchHit> searchHits = new ArrayList<>(PageRequest.DEFAULT.getSize());
        elasticSearchRestTemplate.searchWithPage(
                hs -> searchHits.addAll(Arrays.asList(hs)), PageRequest.DEFAULT,
                new Object[]{new TransferInfo().setClassName(entityClassName)},
                null, ElasticSearchConstant.TRANSFER_INDEX);
        // 未查询到关于该类的索引升级数据
        if (searchHits.isEmpty()) {
            boolean indexExists = this.elasticSearchRestTemplate.exists(new GetIndexRequest(entityRepositoryDetail.getIndexName()));
            if (indexExists) {
                throw new GlobalSystemException("index[{}] exists,you must delete index or upgrade index,and restart project", entityRepositoryDetail.getIndexName());
            }
            // 执行创建 mapping 前的函数
            this.createMappingBefore();
            // 创建索引
            elasticSearchRestTemplate.createIndexMapping(
                    entityRepositoryDetail.getIndexName(), entityRepositoryDetail.getMapping(), entityRepositoryDetail.getSettingJson());
            // 执行创建 mapping 后的声明周期
            this.createMappingEnd();
            // 不需要迁移数据
            return transferInfo.setTransfer(false).setUpdateAlias(true)
                    .setDesc(ElasticSearchConstant.DESC_CREATE_INDEX_MAPPING)
                    .setIndexName(entityRepositoryDetail.getIndexName())
                    .setVersion(entityRepositoryDetail.getIndexDescription().version())
                    .setUpgradeVersion(entityRepositoryDetail.getIndexDescription().upgradeVersion());
        } else {
            transferInfo.setIndexName(entityRepositoryDetail.getIndexName())
                    .setVersion(entityRepositoryDetail.getIndexDescription().version())
                    .setUpgradeVersion(entityRepositoryDetail.getIndexDescription().upgradeVersion());
            SearchHit searchHit = searchHits.get(searchHits.size() - 1);
            TransferInfo endTransferInfo = new TransferInfo(searchHit.getId(), searchHit.getSourceAsMap());

            if (!Objects.equals(transferInfo.getVersion(), endTransferInfo.getVersion())) {
                // 版本号不一致 升级索引 迁移数据
                transferInfo.setTransfer(true).setUpIndexName(endTransferInfo.getIndexName());
                // 创建索引
                elasticSearchRestTemplate.createIndexMapping(
                        transferInfo.getIndexName(), entityRepositoryDetail.getMapping(), entityRepositoryDetail.getSettingJson());
                //
                transferInfo.setTransfer(true).setUpdateAlias(true)
                        .setDesc(ElasticSearchConstant.DESC_CREATE_INDEX_MAPPING_AND_TRANSGER_DB)
                        .setUpIndexName(endTransferInfo.getIndexName());
            } else if (!Objects.equals(transferInfo.getUpgradeVersion(), endTransferInfo.getUpgradeVersion())) {
                // 版本号一致 升级编号不一致,更新索引
                // 更新 mapping
                elasticSearchRestTemplate.putMapping(transferInfo.getIndexName(), entityRepositoryDetail.getMapping());
                // 不需要迁移数据
                transferInfo.setTransfer(false).setDesc(ElasticSearchConstant.DESC_UPDATE_MAPPING);
            } else {
                transferInfo.setRowId(endTransferInfo.getRowId());
            }

            // 需要迁移数据 // 获取上一个索引
            IndexTransfer indexTransfer = entityRepositoryDetail.getIndexTransfer();
            if (Objects.nonNull(indexTransfer)) {
                // 版本迁移结果
                TransferInfo queryTransferInfo = new TransferInfo()
                        .setClassName(entityClassName).setTransferVersion(indexTransfer.upgradeVersion());
                List<SearchHit> transferSearchHits = elasticSearchRestTemplate.search(
                        new Object[]{queryTransferInfo}, null, ElasticSearchConstant.TRANSFER_INDEX);
                // 未执行过的迁移编号
                if (transferSearchHits.isEmpty()) {
                    transferInfo.setRowId(null).setTransfer(true)
                            .setUpIndexName(indexTransfer.sourceIndex()).setTransferVersion(indexTransfer.upgradeVersion());
                }
            }
        }
        return transferInfo;
    }

    /**
     * 索引别名处理
     */
    private void operationAlias() {
        String indexName = entityRepositoryDetail.getIndexName(), alias = entityRepositoryDetail.getAlias();
        boolean indexExistsAlias = elasticSearchRestTemplate.indexExistsAlias(alias);
        if (indexExistsAlias) elasticSearchRestTemplate.indexRemoveAlias(indexName, alias);
        elasticSearchRestTemplate.indexSetAlias(indexName, alias);
    }

    /**
     * 执行创建 mapping 后的声明周期
     */
    private void createMappingEnd() {
        Class<? extends CreateMappingEnd>[] createMappingEnd =
                entityRepositoryDetail.getIndexDescription().createMappingEnd();
        if (Objects.nonNull(createMappingEnd) && createMappingEnd.length > 0) {
            try {
                for (Class<? extends CreateMappingEnd> aClass : createMappingEnd) {
                    aClass.newInstance().end(elasticSearchRestTemplate);
                }
            } catch (InstantiationException | IllegalAccessException e) {
                throw new GlobalSystemException(e);
            }
        }
    }

    /**
     * 执行创建 mapping 前的函数
     */
    private void createMappingBefore() {
        // 生命周期: 创建索引前函数执行
        Class<? extends CreateMappingBefore>[] createMappingBefore =
                entityRepositoryDetail.getIndexDescription().createMappingBefore();
        if (Objects.nonNull(createMappingBefore) && createMappingBefore.length > 0) {
            try {
                for (Class<? extends CreateMappingBefore> c : createMappingBefore) {
                    c.newInstance().before(elasticSearchRestTemplate);
                }
            } catch (InstantiationException | IllegalAccessException e) {
                throw new GlobalSystemException(e);
            }
        }
    }

    /**
     * 数据迁移
     * @param fromIndex 来源 index
     * @param toIndex   目标 index
     */
    private void transferDb(String fromIndex, String toIndex) {
        try {
            TransferType transgerType = entityRepositoryDetail.getTransgerType();
            if (Objects.equals(transgerType, TransferType.DEFAULT)) {
                ThreadLocalDetail.start(this.elasticSearchConfig, RequestOptions.DEFAULT);
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
                SearchRequest searchRequest = new SearchRequest(fromIndex).source(searchSourceBuilder);
                SearchResponse searchResponse =
                        elasticSearchRestTemplate.search(searchRequest, hs -> bulkTransgerDb(fromIndex, toIndex, hs));
                log.debug("dbSize:{}", searchResponse.getHits().getTotalHits().value);
            } else {
                elasticSearchRestTemplate.reindex(toIndex, fromIndex);
            }
        } catch (Exception e) {
            throw new InitRepositoryException(e);
        }
    }


    /**
     * 批量迁移数据
     * @param fromIndex
     * @param toIndex
     * @param hits
     */
    private void bulkTransgerDb(String fromIndex, String toIndex, SearchHit[] hits) {
        try {
            BulkRequest bulkRequest = new BulkRequest();
            Arrays.stream(hits).forEach(h -> {
                Map<String, Object> sourceAsMap = h.getSourceAsMap();
                String routing = entityRepositoryDetail.routing(sourceAsMap);
                XContentBuilder xContentBuilder = entityRepositoryDetail.mapToXContentBuilder(sourceAsMap);
                IndexRequest indexRequest = new IndexRequest(toIndex)
                        .routing(routing).id(h.getId()).source(xContentBuilder);
                bulkRequest.add(indexRequest);
            });
            BulkResponse bulkResponse = elasticSearchRestTemplate.restHighLevelClient().bulk(bulkRequest, ThreadLocalDetail.requestOptions());
            if (bulkResponse.hasFailures()) {
                throw new GlobalSystemException(bulkResponse.buildFailureMessage());
            }
            log.debug("数据迁移:fromIndex:{}.toIndex:{}.dbSize:{}", fromIndex, toIndex, bulkRequest.numberOfActions());
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

}
